package com.hefei.garden.comsumer;

import org.apache.pulsar.client.api.*;

public class ConsumerClient {
    public static void main(String[] args) throws PulsarClientException {

        // 创建Pulsar客户端连接
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://192.168.92.110:6650")
                .build();

        // 创建消费者
        String topicName = "my-topic";
        SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Earliest; // 从最早的消息开始消费
        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic(topicName)
                .subscriptionInitialPosition(subscriptionInitialPosition)
                .subscriptionName("my-subscription")
                .subscribe();


        // 接收消息
        while (true) {
            Message<String> message = consumer.receive();
            try {
                System.out.printf("Received message with ID %s and content '%s'%n",
                        message.getMessageId(), message.getValue());
                consumer.acknowledge(message);
            } catch (Exception e) {
                consumer.negativeAcknowledge(message);
            }
        }
    }
}
